package com.wang.dmp.graphx

import com.wang.dmp.tag.{Tags4Ad, Tags4App, Tags4Area, Tags4Business, Tags4Device, Tags4KeyWords}
import com.wang.dmp.utils.{ConfigHandler, TagsHandler}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/** *
 * 借助图计算中的连通图算法解决多个渠道的用户识别问题
 * 得到当天最终的用户标签数据
 */
object Tags4Final {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("当天的最终上下文标签")
      .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //序列化
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)

    //读取原始数据本身
    val rawDataFrame = sQLContext.read.parquet(ConfigHandler.parquetPath)
    //读取广播变量的数据 也就是读取app数据字典
    val appDict = sc.textFile(ConfigHandler.appDictPath).map(_.split("\t", -1))
      .filter(_.length >= 5)
      .map(arr => (arr(4), arr(1)))
      .collect()
      .toMap
    //广播出去
    val appDictBT = sc.broadcast(appDict)

    //读取停用词的词典
    val stopWordDict = sc.textFile(ConfigHandler.stopwordPath).map((_, 0)).collect().toMap
    //广播出去
    val stopWordDictBT = sc.broadcast(stopWordDictBT)

    val baseRDD: RDD[(List[String], Row)] = rawDataFrame.filter(TagsHandler.hasNeedOneUserId).map(row => {
      //获取当前行所有用户的非空的ID
      val allUserIds = TagsHandler.getCurrentRowAllUserId(row)
      (allUserIds, row)
    })


    //构造点集合
    val verties: RDD[(VertexId, List[(String, Int)])] = baseRDD.flatMap(tp => {
      //获取当前行所有的用户ID
      val row = tp._2
      //用户的标签
      val adTag = Tags4Ad.makeTags(row)
      val areaTag = Tags4Area.makeTags(row)
      val deviceTag = Tags4Device.makeTags(row)
      val appTag = Tags4App.makeTags(row, appDictBT)
      val kwTag = Tags4KeyWords.makeTags(row, stopWordDictBT)
      val bsTag = Tags4Business.makeTags(row)


      //当前行的标签数据
      val currentRowTag = adTag ++ areaTag ++ deviceTag ++ appTag ++ kwTag ++ bsTag

      //list[(String,Int)] = (标签：V>0,用户的ID：0)
      val VD = tp._1.map((_, 0)) ++ currentRowTag

      //只有第一个人可以携带顶点VD的属性数据？？？如果同一行上的过个顶点都携带了属性VD,
      //同一行的数据属于同一个用户的，将来肯定汇聚和到一起，会造成重复的叠加
      tp._1.map(uId => {
        if (tp._1.head.equals(uId)) {
          (uId.hashCode.toLong, VD)
        } else {
          (uId.hashCode.toLong, List.empty)
        }
      })
    })


    //构造边集合
    val edges: RDD[Edge[Int]] = baseRDD.flatMap(tp => {
      //A B C :
      tp._1.tail.map(uId => Edge(tp._1.head.hashCode.toLong, uId.hashCode.toLong, 0))

    })

    //图对象
    val graphy = Graph(verties, edges)

    //连通图图计算  找到图中可以联通的分支，取出每个联通分支中的所有点和其分支中最小的点的元祖集合（vId,commonMinId)
    val cc = graphy.connectedComponents().vertices

    //认祖归宗
    cc.join(verties).map {
      case (uId, (commonId, tagsAndUserId)) => (commonId, tagsAndUserId)
    }.reduceByKey {
      case (list1, list2) => (list1 ++ list2).groupBy(_._1).mapValues(_.map(_._2).sum).toList
    }.foreach(println)





    //关闭
    sc.stop()
  }
}
